package org.codehaus.activemq;

import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.management.j2ee.statistics.Stats;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.management.JMSConsumerStatsImpl;
import org.codehaus.activemq.management.StatsCapable;
import org.codehaus.activemq.message.ActiveMQDestination;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.util.MemoryBoundedQueue;
import org.codehaus.activemq.selector.SelectorParser;

public class ActiveMQMessageConsumer
  implements MessageConsumer, StatsCapable
{
  private static final Log log = LogFactory.getLog(ActiveMQMessageConsumer.class);
  protected ActiveMQSession session;
  protected String consumerId;
  protected MemoryBoundedQueue messageQueue;
  protected String messageSelector;
  private MessageListener messageListener;
  protected String consumerName;
  protected ActiveMQDestination destination;
  private boolean closed;
  protected int consumerNumber;
  protected int prefetchNumber;
  protected long startTime;
  protected boolean noLocal;
  protected boolean browser;
  private Thread accessThread;
  private Object messageListenerGuard;
  private JMSConsumerStatsImpl stats;

  protected ActiveMQMessageConsumer(ActiveMQSession theSession, ActiveMQDestination dest, String name, String selector, int cnum, int prefetch, boolean noLocalValue, boolean browserValue)
    throws JMSException
  {
    if (dest == null) {
      throw new InvalidDestinationException("Do not understand a null destination");
    }
    if (dest.isTemporary())
    {
      String physicalName = dest.getPhysicalName();
      if (physicalName == null) {
        throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
      }
      String clientID = theSession.connection.clientID;
      if (physicalName.indexOf(clientID) < 0) {
        throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
      }
    }
    if (selector != null) {
      selector = selector.trim();
      if (selector.length() > 0)
      {
        new SelectorParser().parse(selector);
      }
    }
    this.session = theSession;
    this.destination = dest;
    this.consumerName = name;
    this.messageSelector = selector;

    this.consumerNumber = cnum;
    this.prefetchNumber = prefetch;
    this.noLocal = noLocalValue;
    this.browser = browserValue;
    this.startTime = System.currentTimeMillis();
    this.messageListenerGuard = new Object();
    String queueName = theSession.connection.clientID + ":" + name;
    queueName = queueName + ":" + cnum;
    this.messageQueue = theSession.connection.getMemoryBoundedQueue(queueName);
    this.stats = new JMSConsumerStatsImpl(theSession.getSessionStats(), dest);
    this.session.addConsumer(this);
  }

  public Stats getStats()
  {
    return this.stats;
  }

  public JMSConsumerStatsImpl getConsumerStats()
  {
    return this.stats;
  }

  public String toString()
  {
    return "MessageConsumer: " + this.consumerId;
  }

  public int getPrefetchNumber()
  {
    return this.prefetchNumber;
  }

  public void setPrefetchNumber(int prefetchNumber)
  {
    this.prefetchNumber = prefetchNumber;
  }

  public String getMessageSelector()
    throws JMSException
  {
    checkClosed();
    return this.messageSelector;
  }

  public MessageListener getMessageListener()
    throws JMSException
  {
    checkClosed();
    return this.messageListener;
  }

  public void setMessageListener(MessageListener listener)
    throws JMSException
  {
    checkClosed();
    this.messageListener = listener;
  }

  public Message receive()
    throws JMSException
  {
    checkClosed();
    try {
      this.accessThread = Thread.currentThread();
      ActiveMQMessage message = (ActiveMQMessage)this.messageQueue.dequeue();
      this.accessThread = null;
      messageDelivered(message, true);
      return message;
    } catch (InterruptedException ioe) {
    }
    return null;
  }

  public Message receive(long timeout)
    throws JMSException
  {
    checkClosed();
    try {
      if (timeout == 0L) {
        return receive();
      }
      this.accessThread = Thread.currentThread();
      ActiveMQMessage message = (ActiveMQMessage)this.messageQueue.dequeue(timeout);
      this.accessThread = null;

      messageDelivered(message, true);
      return message;
    } catch (InterruptedException ioe) {
    }
    return null;
  }

  public Message receiveNoWait()
    throws JMSException
  {
    checkClosed();
    if (this.messageQueue.size() > 0) {
      try {
        ActiveMQMessage message = (ActiveMQMessage)this.messageQueue.dequeue();
        messageDelivered(message, true);
        return message;
      }
      catch (InterruptedException ioe) {
        throw new JMSException("Queue is interrupted: " + ioe.getMessage());
      }
    }
    return null;
  }

  public void close()
    throws JMSException
  {
    try
    {
      this.accessThread.interrupt();
    }
    catch (NullPointerException npe) {
    }
    catch (SecurityException se) {
    }
    this.session.removeConsumer(this);
    this.messageQueue.close();
    this.closed = true;
  }

  public boolean isDurableSubscriber()
  {
    return ((this instanceof ActiveMQTopicSubscriber)) && (this.consumerName != null) && (this.consumerName.length() > 0);
  }

  protected void checkClosed()
    throws IllegalStateException
  {
    if (this.closed)
      throw new IllegalStateException("The Consumer is closed");
  }

  protected void processMessage(ActiveMQMessage message)
  {
    message.setConsumerId(this.consumerId);
    MessageListener listener = null;
    synchronized (this.messageListenerGuard) {
      listener = this.messageListener;
    }
    try {
      if (!this.closed) {
        if (listener != null) {
          listener.onMessage(message);
          messageDelivered(message, true);
        }
        else {
          this.messageQueue.enqueue(message);
        }
      }
      else
        messageDelivered(message, false);
    }
    catch (Exception e)
    {
      log.warn("could not process message: " + message, e);

      messageDelivered(message, false);
    }
  }

  protected String getConsumerId()
  {
    return this.consumerId;
  }

  protected void setConsumerId(String consumerId)
  {
    this.consumerId = consumerId;
  }

  protected String getConsumerName()
  {
    return this.consumerName;
  }

  protected void setConsumerName(String value)
  {
    this.consumerName = value;
  }

  protected int getConsumerNumber()
  {
    return this.consumerNumber;
  }

  protected void setConsumerNumber(int value)
  {
    this.consumerNumber = value;
  }

  protected boolean isNoLocal()
  {
    return this.noLocal;
  }

  protected boolean isBrowser()
  {
    return this.browser;
  }

  protected void setBrowser(boolean value)
  {
    this.browser = value;
  }

  protected ActiveMQDestination getDestination()
  {
    return this.destination;
  }

  protected long getStartTime()
  {
    return this.startTime;
  }

  private void messageDelivered(ActiveMQMessage message, boolean messageRead) {
    boolean read = this.browser ? false : messageRead;
    this.session.messageDelivered((isDurableSubscriber()) || (this.destination.isQueue()), message, read, true);
    if (messageRead)
      this.stats.onMessage(message);
  }
}